// Copyright 2021 Vectorized, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package v1alpha1

import (
	apierrors "k8s.io/apimachinery/pkg/api/errors"
	"k8s.io/apimachinery/pkg/api/resource"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/util/validation/field"
	ctrl "sigs.k8s.io/controller-runtime"
	logf "sigs.k8s.io/controller-runtime/pkg/log"
	"sigs.k8s.io/controller-runtime/pkg/webhook"
)

const (
	kb = 1024
	mb = 1024 * kb
	gb = 1024 * mb
)

// log is for logging in this package.
var log = logf.Log.WithName("cluster-resource")

// SetupWebhookWithManager autogenerated function by kubebuilder
func (r *Cluster) SetupWebhookWithManager(mgr ctrl.Manager) error {
	return ctrl.NewWebhookManagedBy(mgr).
		For(r).
		Complete()
}

//+kubebuilder:webhook:path=/mutate-redpanda-vectorized-io-v1alpha1-cluster,mutating=true,failurePolicy=fail,sideEffects=None,groups=redpanda.vectorized.io,resources=clusters,verbs=create;update,versions=v1alpha1,name=mcluster.kb.io,admissionReviewVersions={v1,v1beta1}

var _ webhook.Defaulter = &Cluster{}

// Default implements webhook.Defaulter so a webhook will be registered for the type
// TODO(user): fill in your defaulting logic.
func (r *Cluster) Default() {
	log.Info("default", "name", r.Name)
}

// TODO(user): change verbs to "verbs=create;update;delete" if you want to enable deletion validation.
//+kubebuilder:webhook:path=/validate-redpanda-vectorized-io-v1alpha1-cluster,mutating=false,failurePolicy=fail,sideEffects=None,groups=redpanda.vectorized.io,resources=clusters,verbs=create;update,versions=v1alpha1,name=vcluster.kb.io,admissionReviewVersions={v1,v1beta1}

var _ webhook.Validator = &Cluster{}

// ValidateCreate implements webhook.Validator so a webhook will be registered for the type
func (r *Cluster) ValidateCreate() error {
	log.Info("validate create", "name", r.Name)

	var allErrs field.ErrorList

	allErrs = append(allErrs, r.validateKafkaListeners()...)

	allErrs = append(allErrs, r.validateAdminListeners()...)

	allErrs = append(allErrs, r.validatePandaproxyListeners()...)

	allErrs = append(allErrs, r.checkCollidingPorts()...)

	allErrs = append(allErrs, r.validateMemory()...)

	allErrs = append(allErrs, r.validateCPU()...)

	allErrs = append(allErrs, r.validateArchivalStorage()...)

	if len(allErrs) == 0 {
		return nil
	}

	return apierrors.NewInvalid(
		r.GroupVersionKind().GroupKind(),
		r.Name, allErrs)
}

// ValidateUpdate implements webhook.Validator so a webhook will be registered for the type
func (r *Cluster) ValidateUpdate(old runtime.Object) error {
	log.Info("validate update", "name", r.Name)
	oldCluster := old.(*Cluster)
	var allErrs field.ErrorList

	if r.Spec.Replicas != nil && oldCluster.Spec.Replicas != nil && *r.Spec.Replicas < *oldCluster.Spec.Replicas {
		allErrs = append(allErrs,
			field.Invalid(field.NewPath("spec").Child("replicas"),
				r.Spec.Replicas,
				"scaling down is not supported"))
	}

	allErrs = append(allErrs, r.validateKafkaListeners()...)

	allErrs = append(allErrs, r.validateAdminListeners()...)

	allErrs = append(allErrs, r.validatePandaproxyListeners()...)

	allErrs = append(allErrs, r.checkCollidingPorts()...)

	allErrs = append(allErrs, r.validateMemory()...)

	allErrs = append(allErrs, r.validateCPU()...)

	allErrs = append(allErrs, r.validateArchivalStorage()...)

	if len(allErrs) == 0 {
		return nil
	}

	return apierrors.NewInvalid(
		r.GroupVersionKind().GroupKind(),
		r.Name, allErrs)
}

// ReserveMemoryString is amount of memory that we reserve for other processes than redpanda in the container
const ReserveMemoryString = "1M"

func (r *Cluster) validateAdminListeners() field.ErrorList {
	var allErrs field.ErrorList
	externalAdmin := r.AdminAPIExternal()
	targetAdminCount := 1
	if externalAdmin != nil {
		targetAdminCount = 2
	}
	if len(r.Spec.Configuration.AdminAPI) != targetAdminCount {
		allErrs = append(allErrs,
			field.Invalid(field.NewPath("spec").Child("configuration").Child("adminApi"),
				r.Spec.Configuration.AdminAPI,
				"need exactly one internal API listener and up to one external"))
	}

	if externalAdmin != nil && externalAdmin.Port != 0 {
		allErrs = append(allErrs,
			field.Invalid(field.NewPath("spec").Child("configuration").Child("adminApi"),
				r.Spec.Configuration.AdminAPI,
				"external admin listener cannot have port specified"))
	}

	// for now only one listener can have TLS to be backward compatible with v1alpha1 API
	foundListenerWithTLS := false
	for i, p := range r.Spec.Configuration.AdminAPI {
		if p.TLS.Enabled {
			if foundListenerWithTLS {
				allErrs = append(allErrs,
					field.Invalid(field.NewPath("spec").Child("configuration").Child("adminApi").Index(i).Child("tls"),
						r.Spec.Configuration.AdminAPI[i].TLS,
						"only one listener can have TLS enabled"))
			}
			foundListenerWithTLS = true
		}
		// we need to run the validation on all listeners to also catch errors like !Enabled && RequireClientAuth
		allErrs = append(allErrs, validateAdminTLS(p.TLS, field.NewPath("spec").Child("configuration").Child("adminApi").Index(i).Child("tls"))...)
	}
	return allErrs
}

func (r *Cluster) validateKafkaListeners() field.ErrorList {
	var allErrs field.ErrorList
	if len(r.Spec.Configuration.KafkaAPI) == 0 {
		allErrs = append(allErrs,
			field.Invalid(field.NewPath("spec").Child("configuration").Child("kafkaApi"),
				r.Spec.Configuration.KafkaAPI,
				"need at least one kafka api listener"))
	}

	var external *KafkaAPI
	for i, p := range r.Spec.Configuration.KafkaAPI {
		if p.External.Enabled {
			if external != nil {
				allErrs = append(allErrs,
					field.Invalid(field.NewPath("spec").Child("configuration").Child("kafkaApi"),
						r.Spec.Configuration.KafkaAPI,
						"only one kafka api listener can be marked as external"))
			}
			external = &r.Spec.Configuration.KafkaAPI[i]
			if external.Port != 0 {
				allErrs = append(allErrs,
					field.Invalid(field.NewPath("spec").Child("configuration").Child("kafkaApi"),
						r.Spec.Configuration.KafkaAPI,
						"external kafka api listener cannot have port specified, it's autogenerated"))
			}
		}
	}

	// for now only one listener can have TLS to be backward compatible with v1alpha1 API
	foundListenerWithTLS := false
	for i, p := range r.Spec.Configuration.KafkaAPI {
		if p.TLS.Enabled {
			if foundListenerWithTLS {
				allErrs = append(allErrs,
					field.Invalid(field.NewPath("spec").Child("configuration").Child("kafkaApi").Index(i).Child("tls"),
						r.Spec.Configuration.KafkaAPI[i].TLS,
						"only one listener can have TLS enabled"))
			}
			foundListenerWithTLS = true
		}
		// we need to run the validation on all listeners to also catch errors like !Enabled && RequireClientAuth
		allErrs = append(allErrs, validateTLS(p.TLS, field.NewPath("spec").Child("configuration").Child("kafkaApi").Index(i).Child("tls"))...)
	}

	if !((len(r.Spec.Configuration.KafkaAPI) == 2 && external != nil) || (external == nil && len(r.Spec.Configuration.KafkaAPI) == 1)) {
		allErrs = append(allErrs,
			field.Invalid(field.NewPath("spec").Child("configuration").Child("kafkaApi"),
				r.Spec.Configuration.KafkaAPI,
				"one internal listener and up to to one external kafka api listener is required"))
	}

	return allErrs
}

func (r *Cluster) validatePandaproxyListeners() field.ErrorList {
	var allErrs field.ErrorList
	var proxyExternal *PandaproxyAPI
	kafkaExternal := r.ExternalListener()
	for i, p := range r.Spec.Configuration.PandaproxyAPI {
		if !p.External.Enabled {
			continue
		}
		if proxyExternal != nil {
			allErrs = append(allErrs,
				field.Invalid(field.NewPath("spec").Child("configuration").Child("pandaproxyApi").Index(i),
					r.Spec.Configuration.PandaproxyAPI[i],
					"only one pandaproxy api listener can be marked as external"))
		}
		proxyExternal = &r.Spec.Configuration.PandaproxyAPI[i]
		if proxyExternal.Port != 0 {
			allErrs = append(allErrs,
				field.Invalid(field.NewPath("spec").Child("configuration").Child("pandaproxyApi").Index(i),
					r.Spec.Configuration.PandaproxyAPI[i],
					"external pandaproxy api listener cannot have port specified, it's autogenerated"))
		}
		if (kafkaExternal == nil || !kafkaExternal.External.Enabled) && (proxyExternal != nil && proxyExternal.External.Enabled) {
			allErrs = append(allErrs,
				field.Invalid(field.NewPath("spec").Child("configuration").Child("pandaproxyApi").Index(i),
					r.Spec.Configuration.PandaproxyAPI[i],
					"cannot have a pandaproxy external listener without a kafka external listener"))
		}
		if kafkaExternal == nil && proxyExternal.External.Subdomain != "" {
			allErrs = append(allErrs,
				field.Invalid(field.NewPath("spec").Child("configuration").Child("pandaproxyApi").Index(i),
					r.Spec.Configuration.PandaproxyAPI[i],
					"kafka external listener is empty but must specify the same sudomain as that of the external pandaproxy"))
		}
		if kafkaExternal != nil && kafkaExternal.External.Subdomain != proxyExternal.External.Subdomain {
			allErrs = append(allErrs,
				field.Invalid(field.NewPath("spec").Child("configuration").Child("pandaproxyApi").Index(i),
					r.Spec.Configuration.PandaproxyAPI[i],
					"sudomain of external pandaproxy must be the same as kafka's"))
		}
	}

	// for now only one listener can have TLS to be backward compatible with v1alpha1 API
	foundListenerWithTLS := false
	for i, p := range r.Spec.Configuration.PandaproxyAPI {
		if p.TLS.Enabled {
			if foundListenerWithTLS {
				allErrs = append(allErrs,
					field.Invalid(field.NewPath("spec").Child("configuration").Child("pandaproxyApi").Index(i).Child("tls"),
						r.Spec.Configuration.PandaproxyAPI[i].TLS,
						"only one pandaproxy listener can have TLS enabled"))
			}
			foundListenerWithTLS = true
		}
		allErrs = append(allErrs, validatePandaproxyTLS(p.TLS, field.NewPath("spec").Child("configuration").Child("pandaproxyApi").Index(i).Child("tls"))...)
	}

	// If we have an external proxy listener and no other listeners, we're missing an internal one
	if proxyExternal != nil && len(r.Spec.Configuration.PandaproxyAPI) == 1 {
		allErrs = append(allErrs,
			field.Invalid(field.NewPath("spec").Child("configuration").Child("pandaproxyApi"),
				r.Spec.Configuration.PandaproxyAPI,
				"an internal pandaproxy listener is required when an external one is provided"))
	}

	if !((len(r.Spec.Configuration.PandaproxyAPI) == 2 && proxyExternal != nil) || (proxyExternal == nil && len(r.Spec.Configuration.PandaproxyAPI) <= 1)) {
		allErrs = append(allErrs,
			field.Invalid(field.NewPath("spec").Child("configuration").Child("pandaproxyApi"),
				r.Spec.Configuration.PandaproxyAPI,
				"up to one internal listener and no external listener, or one external and one internal listener for pandaproxy is allowed"))
	}

	return allErrs
}

func (r *Cluster) validateCPU() field.ErrorList {
	var allErrs field.ErrorList

	// CPU limit (if set) cannot be lower than the requested
	if !r.Spec.Resources.Requests.Cpu().IsZero() && !r.Spec.Resources.Limits.Cpu().IsZero() &&
		r.Spec.Resources.Limits.Cpu().Cmp(*r.Spec.Resources.Requests.Cpu()) == -1 {
		allErrs = append(allErrs,
			field.Invalid(
				field.NewPath("spec").Child("resources").Child("requests").Child("cpu"),
				r.Spec.Resources.Requests.Cpu(),
				"CPU limit cannot be lower than the request, either increase the limit or remove it"))
	}

	return allErrs
}

// validateMemory verifies that memory limits are aligned with the minimal requirement of redpanda
// which is 1GB per core. To verify this, we need to subtract the 1M we reserve currently for other processes
func (r *Cluster) validateMemory() field.ErrorList {
	var allErrs field.ErrorList

	// Ensure spare memory for other processes
	quantity := resource.MustParse(ReserveMemoryString)
	if !r.Spec.Configuration.DeveloperMode && (r.Spec.Resources.Requests.Memory().Value()-quantity.Value()) < gb {
		allErrs = append(allErrs,
			field.Invalid(
				field.NewPath("spec").Child("resources").Child("requests").Child("memory"),
				r.Spec.Resources.Limits.Memory(),
				"need minimum request of 1GB + 1MB of memory"))
	}

	// Ensure a requested 2GB of memory per core
	requests := r.Spec.Resources.Requests.DeepCopy()
	requests.Cpu().RoundUp(0)
	requestedCores := requests.Cpu().Value()
	if !r.Spec.Configuration.DeveloperMode && r.Spec.Resources.Requests.Memory().Value() < requestedCores*MinimumMemoryPerCore {
		allErrs = append(allErrs,
			field.Invalid(
				field.NewPath("spec").Child("resources").Child("requests").Child("memory"),
				r.Spec.Resources.Requests.Memory(),
				"need 2GB of memory per core; need to decrease the requested CPU or increase the memory request"))
	}

	// Memory limit (if set) cannot be lower than the requested
	if !r.Spec.Configuration.DeveloperMode &&
		!r.Spec.Resources.Limits.Memory().IsZero() && r.Spec.Resources.Limits.Memory().Cmp(*r.Spec.Resources.Requests.Memory()) == -1 {
		allErrs = append(allErrs,
			field.Invalid(
				field.NewPath("spec").Child("resources").Child("requests").Child("memory"),
				r.Spec.Resources.Requests.Memory(),
				"Memory limit cannot be lower than the request, either increase the limit or remove it"))
	}

	return allErrs
}

func validateAdminTLS(tlsConfig AdminAPITLS, path *field.Path) field.ErrorList {
	var allErrs field.ErrorList
	if tlsConfig.RequireClientAuth && !tlsConfig.Enabled {
		allErrs = append(allErrs,
			field.Invalid(
				path.Child("requireclientauth"),
				tlsConfig.RequireClientAuth,
				"Enabled has to be set to true for RequireClientAuth to be allowed to be true"))
	}
	return allErrs
}
func validateTLS(tlsConfig KafkaAPITLS, path *field.Path) field.ErrorList {
	var allErrs field.ErrorList
	if tlsConfig.RequireClientAuth && !tlsConfig.Enabled {
		allErrs = append(allErrs,
			field.Invalid(
				path.Child("requireclientauth"),
				tlsConfig.RequireClientAuth,
				"Enabled has to be set to true for RequireClientAuth to be allowed to be true"))
	}
	if tlsConfig.IssuerRef != nil && tlsConfig.NodeSecretRef != nil {
		allErrs = append(allErrs,
			field.Invalid(
				path.Child("nodeSecretRef"),
				tlsConfig.NodeSecretRef,
				"Cannot provide both IssuerRef and NodeSecretRef"))
	}
	return allErrs
}

func validatePandaproxyTLS(
	tlsConfig PandaproxyAPITLS, path *field.Path,
) field.ErrorList {
	var allErrs field.ErrorList
	if tlsConfig.RequireClientAuth && !tlsConfig.Enabled {
		allErrs = append(allErrs,
			field.Invalid(
				path.Child("requireclientauth"),
				tlsConfig.RequireClientAuth,
				"Enabled has to be set to true for RequireClientAuth to be allowed to be true"))
	}
	return allErrs
}

func (r *Cluster) validateArchivalStorage() field.ErrorList {
	var allErrs field.ErrorList
	if !r.Spec.CloudStorage.Enabled {
		return allErrs
	}
	if r.Spec.CloudStorage.AccessKey == "" {
		allErrs = append(allErrs,
			field.Invalid(
				field.NewPath("spec").Child("configuration").Child("cloudStorage").Child("accessKey"),
				r.Spec.CloudStorage.AccessKey,
				"AccessKey has to be provided for cloud storage to be enabled"))
	}
	if r.Spec.CloudStorage.Bucket == "" {
		allErrs = append(allErrs,
			field.Invalid(
				field.NewPath("spec").Child("configuration").Child("cloudStorage").Child("bucket"),
				r.Spec.CloudStorage.Bucket,
				"Bucket has to be provided for cloud storage to be enabled"))
	}
	if r.Spec.CloudStorage.Region == "" {
		allErrs = append(allErrs,
			field.Invalid(
				field.NewPath("spec").Child("configuration").Child("cloudStorage").Child("region"),
				r.Spec.CloudStorage.Region,
				"Region has to be provided for cloud storage to be enabled"))
	}
	if r.Spec.CloudStorage.SecretKeyRef.Name == "" {
		allErrs = append(allErrs,
			field.Invalid(
				field.NewPath("spec").Child("configuration").Child("cloudStorage").Child("secretKeyRef").Child("name"),
				r.Spec.CloudStorage.SecretKeyRef.Name,
				"SecretKeyRef name has to be provided for cloud storage to be enabled"))
	}
	if r.Spec.CloudStorage.SecretKeyRef.Namespace == "" {
		allErrs = append(allErrs,
			field.Invalid(
				field.NewPath("spec").Child("configuration").Child("cloudStorage").Child("secretKeyRef").Child("namespace"),
				r.Spec.CloudStorage.SecretKeyRef.Namespace,
				"SecretKeyRef namespace has to be provided for cloud storage to be enabled"))
	}
	return allErrs
}

// ValidateDelete implements webhook.Validator so a webhook will be registered for the type
func (r *Cluster) ValidateDelete() error {
	log.Info("validate delete", "name", r.Name)

	// TODO(user): fill in your validation logic upon object deletion.
	return nil
}

// nolint:funlen,gocyclo // this function needs rewriting once stabilized
func (r *Cluster) checkCollidingPorts() field.ErrorList {
	var allErrs field.ErrorList
	adminAPIInternal := r.AdminAPIInternal()
	adminAPIExternal := r.AdminAPIExternal()
	proxyAPIInternal := r.PandaproxyAPIInternal()
	proxyAPIExternal := r.PandaproxyAPIExternal()

	// Kafka - Admin
	for _, kafka := range r.Spec.Configuration.KafkaAPI {
		if adminAPIInternal != nil && adminAPIInternal.Port == kafka.Port {
			allErrs = append(allErrs,
				field.Invalid(field.NewPath("spec").Child("configuration", "adminApi", "port"),
					adminAPIInternal.Port,
					"admin port collide with Spec.Configuration.KafkaAPI Port"))
		}
		if adminAPIInternal != nil && adminAPIInternal.Port+1 == kafka.Port {
			allErrs = append(allErrs,
				field.Invalid(field.NewPath("spec").Child("configuration", "adminApi", "port"),
					adminAPIInternal.Port,
					"external admin port collide with Spec.Configuration.KafkaAPI Port"))
		}
	}

	// Kafka - RPC
	for _, kafka := range r.Spec.Configuration.KafkaAPI {
		if r.Spec.Configuration.RPCServer.Port == kafka.Port {
			allErrs = append(allErrs,
				field.Invalid(field.NewPath("spec").Child("configuration", "rpcServer", "port"),
					r.Spec.Configuration.RPCServer.Port,
					"rpc port collide with Spec.Configuration.KafkaAPI Port"))
		}
	}

	// Kafka - Proxy
	for _, kafka := range r.Spec.Configuration.KafkaAPI {
		if proxyAPIInternal != nil && proxyAPIInternal.Port == kafka.Port {
			allErrs = append(allErrs,
				field.Invalid(field.NewPath("spec").Child("configuration", "pandaproxyApi", "port"),
					proxyAPIInternal.Port,
					"proxy port collides with Spec.Configuration.KafkaAPI Port"))
		}
		if proxyAPIInternal != nil && proxyAPIInternal.Port+1 == kafka.Port {
			allErrs = append(allErrs,
				field.Invalid(field.NewPath("spec").Child("configuration", "pandaproxyApi", "port"),
					proxyAPIInternal.Port+1,
					"external proxy port collides with Spec.Configuration.KafkaAPI Port"))
		}
		if proxyAPIInternal != nil && proxyAPIInternal.Port == kafka.Port+1 {
			allErrs = append(allErrs,
				field.Invalid(field.NewPath("spec").Child("configuration", "pandaproxyApi", "port"),
					proxyAPIInternal.Port,
					"proxy port collides with external Spec.Configuration.KafkaAPI Port"))
		}
		if proxyAPIInternal != nil && proxyAPIInternal.Port+1 == kafka.Port+1 {
			allErrs = append(allErrs,
				field.Invalid(field.NewPath("spec").Child("configuration", "pandaproxyApi", "port"),
					proxyAPIInternal.Port+1,
					"external proxy port collides with external Spec.Configuration.KafkaAPI Port"))
		}
	}

	// Admin - RPC
	if adminAPIInternal != nil && adminAPIInternal.Port == r.Spec.Configuration.RPCServer.Port {
		allErrs = append(allErrs,
			field.Invalid(field.NewPath("spec").Child("configuration", "adminApi", "port"),
				adminAPIInternal.Port,
				"admin port collide with Spec.Configuration.RPCServer.Port"))
	}

	// Admin - Proxy
	if adminAPIInternal != nil && proxyAPIInternal != nil && adminAPIInternal.Port == proxyAPIInternal.Port {
		allErrs = append(allErrs,
			field.Invalid(field.NewPath("spec").Child("configuration", "adminApi", "port"),
				adminAPIInternal.Port,
				"admin port collides with Spec.Configuration.PandaproxyApi Port"))
	}

	// Proxy - RPC
	if proxyAPIInternal != nil && proxyAPIInternal.Port == r.Spec.Configuration.RPCServer.Port {
		allErrs = append(allErrs,
			field.Invalid(field.NewPath("spec").Child("configuration", "pandaproxyApi", "port"),
				proxyAPIInternal.Port,
				"pandaproxy port collides with Spec.Configuration.RPCServer.Port"))
	}

	// Kafka Ext - RPC
	for _, kafka := range r.Spec.Configuration.KafkaAPI {
		if r.ExternalListener() != nil && kafka.Port+1 == r.Spec.Configuration.RPCServer.Port {
			allErrs = append(allErrs,
				field.Invalid(field.NewPath("spec").Child("configuration", "rpcServer", "port"),
					r.Spec.Configuration.RPCServer.Port,
					"rpc port collide with external Kafka API that is not visible in the Cluster CR"))
		}
	}

	// Kafka Ext - Admin
	for _, kafka := range r.Spec.Configuration.KafkaAPI {
		if r.ExternalListener() != nil && adminAPIInternal != nil && adminAPIInternal.Port == kafka.Port+1 {
			allErrs = append(allErrs,
				field.Invalid(field.NewPath("spec").Child("configuration", "adminApi", "port"),
					adminAPIInternal.Port,
					"admin port collide with external Kafka API that is not visible in the Cluster CR"))
		}
	}

	// Admin Ext - RPC
	if adminAPIExternal != nil && adminAPIInternal != nil && adminAPIInternal.Port+1 == r.Spec.Configuration.RPCServer.Port {
		allErrs = append(allErrs,
			field.Invalid(field.NewPath("spec").Child("configuration", "rpcServer", "port"),
				r.Spec.Configuration.RPCServer.Port,
				"rpc port collides with external Admin API port that is not visible in the Cluster CR"))
	}

	// Admin Ext - Proxy
	if adminAPIExternal != nil && adminAPIInternal != nil && proxyAPIInternal != nil && adminAPIInternal.Port+1 == proxyAPIInternal.Port {
		allErrs = append(allErrs,
			field.Invalid(field.NewPath("spec").Child("configuration", "pandaproxyApi", "port"),
				proxyAPIInternal.Port,
				"pandaproxy port collides with external Admin API port that is not visible in the Cluster CR"))
	}

	// Admin Ext - Kafka Ext
	for _, kafka := range r.Spec.Configuration.KafkaAPI {
		if r.ExternalListener() != nil && adminAPIExternal != nil && adminAPIInternal != nil && adminAPIInternal.Port+1 == kafka.Port+1 {
			allErrs = append(allErrs,
				field.Invalid(field.NewPath("spec").Child("configuration", "kafkaApi", "port"),
					kafka.Port,
					"kafka port collides with external Admin API port that is not visible in the Cluster CR"))
		}
	}

	// Admin Ext - Proxy Ext
	if adminAPIExternal != nil && adminAPIInternal != nil && proxyAPIExternal != nil && adminAPIInternal.Port+1 == proxyAPIInternal.Port+1 {
		allErrs = append(allErrs,
			field.Invalid(field.NewPath("spec").Child("configuration", "adminApi", "port"),
				adminAPIInternal.Port,
				"pandaproxy port collides with external Admin API port that is not visible in the Cluster CR"))
	}

	return allErrs
}
